package com.shujia.flink.source

import org.apache.flink.streaming.api.scala._

object Demo4TestMysql {

  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //使用自定义mysqlsource
    val mysqlDS: DataStream[String] = env.addSource(new Demo3MysqlSource)

    mysqlDS.map(_.split(",")(4))
      .map((_, 1))
      .keyBy(0) // 将相同的key发送到同一个task中   hash分区
      .sum(1)
      .print()


    env.execute()


  }

}
